package com.xbb.kafka.Interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CounterInterceptor implements ProducerInterceptor<String, String> {

  int success = 0;
  int fail = 0;

  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    return record;
  }

  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    if (null != metadata) {
      success++;
    } else {
      fail++;
    }
  }

  @Override
  public void close() {
    System.out.println("success = " + success);
    System.out.println("fail = " + fail);
  }

  @Override
  public void configure(Map<String, ?> configs) {

  }
}
